home
***
CD-ROM
|
disk
|
FTP
|
other
***
search
/
Cream of the Crop 26
/
Cream of the Crop 26.iso
/
os2
/
pvm34b3.zip
/
pvm34b3
/
pvm3
/
hoster
/
pvmwinhoster.c
< prev
next >
Wrap
C/C++ Source or Header
|
1997-07-22
|
17KB
|
783 lines
static char rcsid[] =
"$Id: pvmwinhoster.c,v 1.2 1997/07/09 13:28:48 pvmsrc Exp $";
/*
* PVM version 3.4: Parallel Virtual Machine System
* University of Tennessee, Knoxville TN.
* Oak Ridge National Laboratory, Oak Ridge TN.
* Emory University, Atlanta GA.
* Authors: J. J. Dongarra, G. E. Fagg, M. Fischer
* G. A. Geist, J. A. Kohl, R. J. Manchek, P. Mucci,
* P. M. Papadopoulos, S. L. Scott, and V. S. Sunderam
* (C) 1997 All Rights Reserved
*
* NOTICE
*
* Permission to use, copy, modify, and distribute this software and
* its documentation for any purpose and without fee is hereby granted
* provided that the above copyright notice appear in all copies and
* that both the copyright notice and this permission notice appear in
* supporting documentation.
*
* Neither the Institutions (Emory University, Oak Ridge National
* Laboratory, and University of Tennessee) nor the Authors make any
* representations about the suitability of this software for any
* purpose. This software is provided ``as is'' without express or
* implied warranty.
*
* PVM version 3 was funded in part by the U.S. Department of Energy,
* the National Science Foundation and the State of Tennessee.
*/
/*
* pvmwin_hoster.c
*
* acts like pvmd' uses rexec right now.
* will get rsh for WIN95 soon
*
* /9/24/1996 Markus Fischer
*/
#ifdef WIN32
#include "..\src\pvmwin.h"
#include <pvmproto.h>
#endif
#ifndef RSHCOMMAND
#define RSHCOMMAND "/usr/ucb/rsh"
#endif
#ifdef HASSTDLIB
#include <stdlib.h>
#endif
#include <stdlib.h>
#include <stdio.h>
#include <sys/types.h>
#include <errno.h>
#ifndef WIN32
#include <pwd.h>
#include <netdb.h>
#endif
#ifdef SYSVSTR
#include <string.h>
#define CINDEX(s,c) strchr(s,c)
#else
#include <strings.h>
#define CINDEX(s,c) index(s,c)
#endif
#include "../include/pvm3.h"
#include "../include/pvmsdpro.h"
/* if > 1, uses parallel startup strategy */
#ifndef RSHNPLL
#define RSHNPLL 5
#endif
#ifndef RSHTIMEOUT
#define RSHTIMEOUT 100
#endif
#ifndef max
#define max(a,b) ((a)>(b)?(a):(b))
#endif
#ifndef min
#define min(a,b) ((a)<(b)?(a):(b))
#endif
#define TALLOC(n,t,g) (t*)malloc((n)*sizeof(t))
#define FREE(p) free((char *)p)
#define STRALLOC(s) strcpy(TALLOC(strlen(s)+1,char,"str"),s)
#define LISTPUTAFTER(o,n,f,r) \
{ (n)->f=(o)->f; (n)->r=o; (o)->f->r=n; (o)->f=n; }
#define LISTPUTBEFORE(o,n,f,r) \
{ (n)->r=(o)->r; (n)->f=o; (o)->r->f=n; (o)->r=n; }
#define LISTDELETE(e,f,r) \
{ (e)->f->r=(e)->r; (e)->r->f=(e)->f; (e)->r=(e)->f=0; }
#define TVCLEAR(tvp) ((tvp)->tv_sec = (tvp)->tv_usec = 0)
#define TVISSET(tvp) ((tvp)->tv_sec || (tvp)->tv_usec)
#define TVXLTY(xtv, ytv) \
((xtv)->tv_sec < (ytv)->tv_sec || \
((xtv)->tv_sec == (ytv)->tv_sec && (xtv)->tv_usec < (ytv)->tv_usec))
#define TVXADDY(ztv, xtv, ytv) \
if (((ztv)->tv_usec = (xtv)->tv_usec + (ytv)->tv_usec) < 1000000) { \
(ztv)->tv_sec = (xtv)->tv_sec + (ytv)->tv_sec; \
} else { \
(ztv)->tv_usec -= 1000000; \
(ztv)->tv_sec = (xtv)->tv_sec + (ytv)->tv_sec + 1; \
}
#define TVXSUBY(ztv, xtv, ytv) \
if ((xtv)->tv_usec >= (ytv)->tv_usec) { \
(ztv)->tv_sec = (xtv)->tv_sec - (ytv)->tv_sec; \
(ztv)->tv_usec = (xtv)->tv_usec - (ytv)->tv_usec; \
} else { \
(ztv)->tv_sec = (xtv)->tv_sec - (ytv)->tv_sec - 1; \
(ztv)->tv_usec = (xtv)->tv_usec + 1000000 - (ytv)->tv_usec; \
}
/*
* for keeping state assoc. with a host
*/
struct hst {
int h_tid;
char *h_name;
char *h_login;
char *h_sopts;
int h_flag;
#define HST_PASSWORD 1 /* ask for a password */
#define HST_MANUAL 2 /* do manual startup */
char *h_cmd;
char *h_result;
};
static char pvmtxt[1024]; /* scratch for error log */
int debugmask = 0;
extern char* username;
char *getenv();
int main (argc, argv)
int argc;
char **argv;
{
static int init=0;
static int needed=1;
// DebugBreak();
pvm_mytid();
pvm_reg_hoster();
pvm_setopt(PvmResvTids, 1);
while (pvm_recv(-1, SM_STHOST) > 0) {
AllocConsole();
GetStdHandle(STD_OUTPUT_HANDLE);
if (init)
printf("***need more ? ***\n");
hoster();
if (!init) {
printf("*** PVM pvmd starter ***\n");
printf("*** Here you can see what your 'add command' is doing ***\n");
printf("*** Also, if you are required to type in passwords :-( ***\n");
printf("*** this is the right place :-) ***\n");
printf("*** It will pop up the next time you have to add more ***\n");
printf("*** slaves and will vanish therefore ... ***\n");
Sleep(12000);
printf("*** Now ! ..... later ... ***\n");
Sleep(1000);
FreeConsole();
init=1;
}
}
pvm_exit();
exit(0);
return 0;
}
/* hoster()
*
* Unpack host table from message, attempt to start 'em up,
* send reply message.
*/
hoster()
{
int num;
int i;
struct hst **hostlist;
struct hst *hp;
char *p;
char sopts[64];
char lognam[256];
char cmd[512];
int fromtid;
int wid;
/*
* unpack the startup message
*/
pvm_bufinfo(pvm_getrbuf(), (int *)0, (int *)0, &fromtid);
pvm_unpackf("%d", &num);
wid = pvm_getmwid(pvm_getrbuf());
printf("hoster() %d to start, wait id %d\n", num, wid);
fflush(stdout);
if (num > 0) {
hostlist = TALLOC(num, struct hst *, "xxx");
for (i = 0; i < num; i++) {
hp = TALLOC(1, struct hst, "xxx");
hostlist[i] = hp;
hp->h_flag = 0;
hp->h_result = 0;
if (pvm_unpackf("%d %s %s %s", &hp->h_tid, sopts, lognam, cmd)) {
printf("hoster() bad message format\n");
pvm_exit();
exit(1);
}
hp->h_sopts = STRALLOC(sopts);
hp->h_login = STRALLOC(lognam);
hp->h_cmd = STRALLOC(cmd);
printf("%d. t%x %s so=\"%s\"\n", i,
hp->h_tid,
hp->h_login,
hp->h_sopts);
if (p = CINDEX(hp->h_login, '@')) {
hp->h_name = STRALLOC(p + 1);
*p = 0;
p = STRALLOC(hp->h_login);
FREE(hp->h_login);
hp->h_login = p;
} else {
hp->h_name = hp->h_login;
hp->h_login = 0;
}
if (!strcmp(hp->h_sopts, "pw"))
hp->h_flag |= HST_PASSWORD;
if (!strcmp(hp->h_sopts, "ms"))
hp->h_flag |= HST_MANUAL;
}
}
/*
* do it
*/
#if RSHNPLL > 1
pl_startup(num, hostlist);
#else /*RSHNPLL > 1*/
for (i = 0; i < num; i++)
pl_startup(1,hostlist++);
#endif /*RSHNPLL > 1*/
/*
* send results back to pvmd
*/
pvm_packf("%+ %d", PvmDataFoo, num);
for (i = 0; i < num; i++) {
pvm_packf("%d", hostlist[i]->h_tid);
pvm_packf("%s", hostlist[i]->h_result
? hostlist[i]->h_result : "PvmDSysErr");
}
/*
printf("hoster() sending back host table\n");
*/
pvm_setmwid(pvm_getsbuf(), wid);
pvm_send(fromtid, SM_STHOSTACK);
return 0;
}
#if RSHNPLL > 1
/********************************************
* this is the new (parallel) startup code *
* *
********************************************/
struct slot {
struct slot *s_link, *s_rlink; /* free/active list */
struct hst *s_hst; /* host table entry */
struct timeval s_bail; /* timeout time */
int s_rfd, s_wfd, s_efd; /* slave stdin/out/err */
char s_buf[256]; /* config reply line */
int s_len; /* length of s_buf */
};
static struct slot slots[RSHNPLL+2]; /* state var/context for each slot */
static struct slot *slfree = 0; /* free list of slots */
close_slot(sp)
struct slot *sp;
{
if (sp->s_wfd != -1)
(void)_close(sp->s_wfd);
if (sp->s_rfd != -1)
(void)_close(sp->s_rfd);
if (sp->s_efd != -1)
(void)_close(sp->s_efd);
LISTDELETE(sp, s_link, s_rlink);
LISTPUTBEFORE(slfree, sp, s_link, s_rlink);
return 0;
}
pl_startup(num, hostlist)
int num;
struct hst **hostlist;
{
int nxth = 0; /* next host in list to start */
struct slot *slact = 0; /* active list of slots */
struct hst *hp;
struct slot *sp, *sp2;
struct timeval tnow;
struct timeval tout;
struct fd_set rfds;
int nfds;
int i;
int n;
char *p;
char ebuf[256]; /* for reading stderr */
/* init slot free list */
slfree = &slots[RSHNPLL+1];
slfree->s_link = slfree->s_rlink = slfree;
slact = &slots[RSHNPLL];
slact->s_link = slact->s_rlink = slact;
for (i = RSHNPLL; i-- > 0; ) {
LISTPUTAFTER(slfree, &slots[i], s_link, s_rlink);
}
/*
* keep at this until all hosts in table are completed
*/
for (; ; ) {
/*
* if empty slots, start on new hosts
*/
for (; ; ) {
/* find a host for slot */
if (slfree->s_link != slfree && nxth < num)
hp = hostlist[nxth++];
else
break;
sp = slfree->s_link;
LISTDELETE(sp, s_link, s_rlink);
sp->s_hst = hp;
sp->s_len = 0;
if (debugmask) {
fprintf(stderr, "pl_startup() trying %s\n", hp->h_name);
}
phase1(sp);
if (hp->h_result) {
/* error or fully started (manual startup) */
LISTPUTBEFORE(slfree, sp, s_link, s_rlink);
} else {
/* partially started */
LISTPUTBEFORE(slact, sp, s_link, s_rlink);
gettimeofday(&sp->s_bail, (struct timezone*)0);
tout.tv_sec = RSHTIMEOUT;
tout.tv_usec = 0;
TVXADDY(&sp->s_bail, &sp->s_bail, &tout);
}
}
/* if no hosts in progress, we are finished */
if (slact->s_link == slact)
break;
/*
* until next timeout, get output from any slot
*/
FD_ZERO(&rfds);
nfds = 0;
TVCLEAR(&tout);
gettimeofday(&tnow, (struct timezone*)0);
for (sp = slact->s_link; sp != slact; sp = sp->s_link) {
if (TVXLTY(&sp->s_bail, &tnow)) {
fprintf(stderr, "pl_startup() %s timed out after %d secs\n",
sp->s_hst->h_name, RSHTIMEOUT);
sp->s_hst->h_result = STRALLOC("PvmCantStart");
sp2 = sp->s_rlink;
close_slot(sp);
sp = sp2;
continue;
}
if (!TVISSET(&tout) || TVXLTY(&sp->s_bail, &tout))
tout = sp->s_bail;
if (sp->s_rfd >= 0)
FD_SET(sp->s_rfd, &rfds);
if (sp->s_rfd > nfds)
nfds = sp->s_rfd;
if (sp->s_efd >= 0)
FD_SET(sp->s_efd, &rfds);
if (sp->s_efd > nfds)
nfds = sp->s_efd;
}
if (slact->s_link == slact)
break;
nfds++;
if (TVXLTY(&tnow, &tout)) {
TVXSUBY(&tout, &tout, &tnow);
} else {
TVCLEAR(&tout);
}
if (debugmask) {
fprintf(stderr, "pl_startup() select timeout is %d.%06d\n",
tout.tv_sec, tout.tv_usec);
}
if ((n = select(nfds, &rfds, (fd_set*)0, (fd_set*)0, &tout)) == -1) {
if (errno != EINTR) {
pvmlogperror("work() select");
pvmbailout(0);
}
}
if (debugmask) {
(void)fprintf(stderr, "pl_startup() select returns %d\n", n);
}
if (n < 1) {
if (n == -1 && errno != EINTR) {
pvmlogperror("pl_startup() select");
pvmbailout(0); /* XXX this is too harsh */
}
continue;
}
/*
* check for response on stdout or stderr of any slave.
*/
for (sp = slact->s_link; sp != slact; sp = sp->s_link) {
/*
* stdout ready. get complete line then scan config info from it.
*/
if (sp->s_rfd >= 0 && FD_ISSET(sp->s_rfd, &rfds)) {
n = win32_read_socket(sp->s_rfd, sp->s_buf + sp->s_len,
sizeof(sp->s_buf) - sp->s_len);
if (n > 0) {
sp->s_len += n;
if (sp->s_len >= sizeof(sp->s_buf)) {
fprintf(stderr, "pl_startup() pvmd@%s: big read\n",
sp->s_hst->h_name);
sp->s_hst->h_result = STRALLOC("PvmCantStart");
}
sp->s_buf[sp->s_len] = 0;
if (p = CINDEX(sp->s_buf + sp->s_len - n, '\n')) {
if (debugmask) {
fprintf(stderr, "pvmd@%s: %s",
sp->s_hst->h_name, sp->s_buf);
}
*p = 0;
sp->s_hst->h_result = STRALLOC(sp->s_buf);
}
} else {
if (n) {
fprintf(stderr, "pl_startup() pvmd@%s\n",
sp->s_hst->h_name);
perror("");
} else {
fprintf(stderr, "pl_startup() pvmd@%s: EOF\n",
sp->s_hst->h_name);
}
sp->s_hst->h_result = STRALLOC("PvmCantStart");
}
if (sp->s_hst->h_result) {
sp2 = sp->s_rlink;
close_slot(sp);
sp = sp2;
continue;
}
}
/*
* response on stderr. log prefixed by remote's host name.
*/
if (sp->s_efd >= 0 && FD_ISSET(sp->s_efd, &rfds)) {
if ((n = win32_read_socket(sp->s_efd, ebuf, sizeof(ebuf)-1)) > 0) {
char *p = ebuf, *q, c;
ebuf[n] = 0;
fprintf(stderr, "pvmd@%s: ", sp->s_hst->h_name);
while (c = *p++ & 0x7f) {
if (isprint(c))
fputc(c, stderr);
else {
fputc('^', stderr);
fputc((c + '@') & 0x7f, stderr);
}
}
fputc('\n', stderr);
} else {
(void)_close(sp->s_efd);
sp->s_efd = -1;
}
}
}
}
return 0;
}
phase1(sp)
struct slot *sp;
{
struct hst *hp;
char *hn;
char *av[16]; /* for rsh args */
int ac;
char buf[512];
char do_cmd[128];
int pid = -1; /* pid of rsh */
char *p;
char *val=0;
FILE *faked_stream;
#ifndef NOREXEC
struct servent *se;
static u_short execport = 0;
if (!execport) {
if (!(se = getservbyname("exec", "tcp"))) {
fprintf(stderr, "phase1() can't getservbyname(): %s\n", "exec");
pvmbailout(0);
}
execport = se->s_port;
#ifndef WIN32
endservent();
#endif
}
#endif
hp = sp->s_hst;
hn = hp->h_name;
sp->s_rfd = sp->s_wfd = sp->s_efd = -1;
/*
* XXX manual startup hack... this is if we can't use rexec or rsh
*/
if (hp->h_flag & HST_MANUAL) {
fprintf(stderr, "*** Manual startup ***\n");
fprintf(stderr, "Login to \"%s\" and type:\n", hn);
fprintf(stderr, "%s\n", hp->h_cmd);
/* get version */
fprintf(stderr, "Type response: ");
fflush(stderr);
if (!(fgets(buf, sizeof(buf), stdin))) {
fprintf(stderr, "host %s read error\n", hn);
goto oops;
}
p = buf + strlen(buf) - 1;
if (*p == '\n')
*p = 0;
hp->h_result = STRALLOC(buf);
fprintf(stderr, "Thanks\n");
fflush(stderr);
return 0;
}
/*
* XXX end manual startup hack
*/
if (!(hp->h_flag & HST_PASSWORD)) { /* use rsh to start */
int wpfd[2], rpfd[2], epfd[2];
int i;
if (debugmask) {
fprintf(stderr, "phase1() trying rsh to %s\n", hn);
}
/* fork an rsh to startup the slave pvmd */
#ifdef IMA_TITN
if (socketpair(AF_UNIX, SOCK_STREAM, 0, wpfd) == -1
|| socketpair(AF_UNIX, SOCK_STREAM, 0, rpfd) == -1
|| socketpair(AF_UNIX, SOCK_STREAM, 0, epfd) == -1) {
pvmlogperror("phase1() socketpair");
goto oops;
}
#else
#ifndef WIN32
if (pipe(wpfd) == -1 || pipe(rpfd) == -1 || pipe(epfd) == -1) {
pvmlogperror("phase1() pipe");
goto oops;
}
#endif
#endif
if (debugmask) {
fprintf(stderr, "phase1() pipes: %d %d %d %d %d %d\n",
wpfd[0], wpfd[1], rpfd[0], rpfd[1], epfd[0], epfd[1]);
}
#ifndef WIN32
if ((pid = fork()) == -1) {
pvmlogperror("phase1() fork");
pvmbailout(0);
}
if (!pid) {
(void)dup2(wpfd[0], 0);
(void)dup2(rpfd[1], 1);
(void)dup2(epfd[1], 2);
for (i = getdtablesize(); --i > 2; )
(void)close(i);
#else
{
#endif
ac = 0;
if (getenv("PVM_RSH")) /* do we have a setting for rsh ?*/
av[ac++] = getenv("PVM_RSH");
else
av[ac++] = RSHCOMMAND;
av[ac++] = hn;
if (hp->h_login) {
av[ac++] = "-l";
av[ac++] = hp->h_login;
}
else {
hp->h_login=username;
av[ac++] = "-l";
av[ac++] = hp->h_login;
}
av[ac++] = hp->h_cmd;
// av[ac++] = "-u";
// av[ac++] = username;
av[ac++] = 0;
if (debugmask) {
for (ac = 0; av[ac]; ac++)
fprintf(stderr, "av[%d]=\"%s\" ", ac, av[ac]);
fputc('\n', stderr);
}
#ifndef WIN32
execvp(av[0], av);
fputs("phase1() execvp failed\n", stderr);
fflush(stderr);
_exit(1);
#else
strcpy(do_cmd,av[0]);
for (ac = 1; av[ac]; ac++) {
strcat(do_cmd," ");
strcat(do_cmd,av[ac]);
}
faked_stream=_popen(do_cmd,"r");
if (faked_stream != NULL)
{
while (1) {
fgets(buf,100,faked_stream);
if (strstr(buf,"ddpro")) {
if (!val)
val = malloc(sizeof (buf) * sizeof(char));
strcpy(val,buf);
sp->s_hst->h_result=val;
break;
/* we got the message */
}
if (strcmp(buf,"PvmDupHost")){
if (!val)
val = malloc(sizeof (buf) * sizeof(char));
strcpy(val,buf);
sp->s_hst->h_result=val;
break;
}
}
}
// how can we close this ???
// _pclose(faked_stream);
#endif
}
#ifndef WIN32
(void)close(wpfd[0]);
(void)close(rpfd[1]);
(void)close(epfd[1]);
sp->s_wfd = wpfd[1];
sp->s_rfd = rpfd[0];
sp->s_efd = epfd[0];
#endif
} else { /* use rexec to start */
#ifdef NOREXEC
fprintf(stderr, "slconfg() sorry, no rexec()\n");
goto oops;
#else
if (debugmask) {
fprintf(stderr, "phase1() rexec \"%s\"\n", hp->h_cmd);
}
#ifndef WIN32
if ((sp->s_wfd = sp->s_rfd = rexec(&hn, execport,
(hp->h_login ? hp->h_login : username),
(char*)0, hp->h_cmd, &sp->s_efd))
== -1) {
fprintf(stderr, "phase1() rexec failed for host %s\n", hn);
goto oops;
}
#endif
sp->s_hst->h_result = malloc (512 * sizeof(char));
if (!rexec(hn,(hp->h_login ? hp->h_login : username),
hp->h_cmd,sp->s_hst->h_result))
{
fprintf(stderr, "phase1() rexec failed for host %s\n", hn);
goto oops;
}
#endif
}
return 0;
oops:
hp->h_result = STRALLOC("PvmCantStart");
oops2:
if (sp->s_wfd != -1)
_close(sp->s_wfd);
if (sp->s_rfd != -1)
_close(sp->s_rfd);
if (sp->s_efd != -1)
_close(sp->s_efd);
sp->s_wfd = sp->s_rfd = sp->s_efd = -1;
return 1;
}
#endif /*RSHNPLL > 1*/